Python Databricks Dataframe Nested Arrays in Pyspark Guidelines

Today in this article, we will see how to use Python Databricks Dataframe Nested Arrays in Pyspark. We will see details on Handling nested Arrays in Pyspark.

Towards the end of this article, we will also cover, when working with PySpark DataFrame transformations and handling arrays, there are several best practices to keep in mind to ensure efficient and effective data processing.

I have below sample JSON which contains a mix of array fields and objects as below,

[
  {
    "name": "Alice",
    "date_field": "2022-03-30",
    "area": {

      "city": {
        "city_code": "asdas",
        "date_field": "2022-03-30"
      },
      "projects": [
        {
          "area_code": "sdas",
          "date_field": "2022-03-30"
        }
      ]
    }
  }
]

PySpark DataFrame transformations

PySpark DataFrame transformations involve operations used to manipulate data within DataFrames.

There are various ways and common use cases where this transformations can be applied.

  1. Filtering Data: Use the filter() or where() functions
  2. Selecting Columns: Use the select() function to choose specific columns from the DataFrame. This is useful when you only need certain columns for further processing or analysis.
  3. Grouping and Aggregating: Use functions like groupBy() and agg() to group data based on one or more columns and perform aggregations such as sum, count, average, etc.
  4. Joining DataFrames: Use the join() function to combine two DataFrames based on a common key.
  5. Sorting Data: Use the orderBy() or sort() functions to sort the DataFrame based on one or more columns. =
  6. Adding or Removing Columns: Use functions like withColumn() and drop() to add new columns to the DataFrame or remove existing columns, respectively.
  7. String Manipulation: Use functions like substring(), trim(), lower(), upper(), etc., to perform string operations on DataFrame columns.
  8. Date and Time Manipulation: Use functions like to_date(), year(), month(), dayofmonth(), etc., from the pyspark.sql.functions module to work with date and time columns.

If you have basic data source and need to transform few fields like performing the Date and time manipulation, one can try below steps to achieve the transformation.

Define StructType schema in PySpark

# Define the schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("date_field", StringType(), True),
    StructField("area_code", StructType([
        StructField("city", StructType([
            StructField("city_code", StringType(), True),
            StructField("date_field", StringType(), True)
        ]), True),
        StructField("projects", ArrayType(StructType([
            StructField("area_code", StringType(), True),
            StructField("date_field", StringType(), True)
        ])), True)
    ]))
])

Modify date field datatype in DataFrame schema

Updated schema type as below for date field where , we will be converting string type timestamp type

StructField("date_field", TimestampType(), True)

# Define the schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("date_field", TimestampType(), True),
    StructField("area", StructType([
        StructField("city", StructType([
            StructField("SpecCode", StringType(), True),
            StructField("date_field", TimestampType(), True)
        ]), True),
        StructField("projects", ArrayType(StructType([
            StructField("code", StringType(), True),
            StructField("date_field", TimestampType(), True)
        ])), True)
    ]))
])

Convert JSON list to JSO string with indentation

# Convert the JSON list to a JSON string with indentation


json_string = json.dumps(json_list, indent=2)



from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType
from pyspark.sql.functions import col, explode, to_date

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Transform JSON Data") \
    .getOrCreate()


# Convert the JSON list to a JSON string with indentation
json_string = json.dumps(json_list, indent=2)

# Create DataFrame from JSON data with defined schema
df = spark.read.schema(schema).json(spark.sparkContext.parallezie(Json_string))


# Write DataFrame to destination
df.write.format("destination").mode("append").save()



# Stop SparkSession
spark.stop()

Above is a generic implementation and can be used to push the data to any destination as required including MongoDB, SQL etc.

Approach 2- Explode nested array in DataFrame

One can also use the data frame explode method to convert a string field to the date field as explained in the below example.

 #Apply transformations to nested fields

df_transformed = df \
    .withColumn("date_field", to_date(col("date_field"))) \
    .withColumn("area.city.date_field", convert_to_date("area.city.date_field")) \
    .withColumn("area.projects", explode(col("area.projects"))) \
    .withColumn("area.projects.date_field", convert_to_date("area.projects.date_field"))

Do you have any comments or ideas or any better suggestions to share?

Please sound off your comments below.

Happy Coding !!

One thought on “Python Databricks Dataframe Nested Arrays in Pyspark- Guidelines

Leave a Reply

Your email address will not be published. Required fields are marked *